#!/usr/bin/env python from collections import defaultdict import math import scipy.io.wavfile import scipy import keras from keras.models import Sequential from keras.layers import Dense, Dropout, Flatten from keras.layers import Conv1D, MaxPooling1D, LSTM from keras.layers import TimeDistributed, BatchNormalization, Activation # This script will load a single file of stored weights as trained by recurrent_convolutional.py # It will then calculate the amount of clips predicted correctly when looking in the top-K predictions for all k # Results will also be outputted to file rec_correctK ######## EDITABLE PARAMETERS ####### # POINT TO TRAINING CSV train_csv = 'train.csv' # POINT TO TEST CSV test_csv = 'test.csv' # POINT TO FOLDER WITH WAV FILES wav_directory = 'wavsongs/' # POINT TO WEIGHTS TO BE LOADED weights_file = 'rec_epoch_50_weights.hd5' # NUMBER OF SAMPLES IN THE WAV FILES nrsamples = 465984 # HOW LARGE THE SEGMENTATION WINDOW SHOULD BE segmentsize = 59049 # CONTROLS THE WINDOW STRIDE DURING SEGMENTATION stride = segmentsize # FILTERS FOR CONVOLUTIONAL LAYERS, ENSURE THERE ARE log(segmentsize)/log(3) FILTER SIZES convFilters = [64,64,128,128,128,128,128,128,256,256] # NODES IN LSTM LAYER LSTMnodes = 512 #################################### class Clip: def __init__(self, songinfo): self.artist = songinfo[0] self.title = songinfo[1] self.album = songinfo[2] self.path = songinfo[3] def asString(self): return self.artist + ' - ' + self.title + ', ' + self.album + ', path: ' + self.path def GetTrainingAndTestData(): train = open(train_csv) songsPerArtist = defaultdict(int) clips = [] artistIndex = dict() for line in train: songinfo = line.strip('\n').split('\t') songsPerArtist[songinfo[0]] += 1 if(songsPerArtist[songinfo[0]] == 1): artistIndex[songinfo[0]] = len(artistIndex) newClip = Clip(songinfo) clips.append(newClip) train.close() test = open(test_csv) testClips = [] for line in test: songinfo = line.strip('\n').split('\t') newClip = Clip(songinfo) songsPerArtist[songinfo[0]] += 1 if(songsPerArtist[songinfo[0]] == 1): artistIndex[songinfo[0]] = len(artistIndex) print('New artist introduced in test set: ' + newClip.asString()) exit() testClips.append(newClip) return artistIndex, clips, testClips def GetSegmentedWav(clip, segmentsize, nrsamples, stride): (rate, wavdata) = scipy.io.wavfile.read(wav_directory + clip.path) segments = segments = int(math.ceil( (nrsamples-segmentsize) / float(stride))) segwav = scipy.zeros((segments,segmentsize)) for i in range(0,segments): segwav[i] = wavdata[i*stride:i*stride+segmentsize] return segwav.reshape(segments,segmentsize,1) def CreateModel(segmentsize, segments, classes): model = Sequential() model.add(TimeDistributed(Conv1D(convFilters[0],3,strides=3),input_shape=(segments,segmentsize,1))) model.add(TimeDistributed(BatchNormalization())) model.add(TimeDistributed(Activation('relu'))) for i in range(1, len(convFilters)): model.add(TimeDistributed(Conv1D(convFilters[i],3,padding='same'))) model.add(TimeDistributed(BatchNormalization())) model.add(TimeDistributed(Activation('relu'))) model.add(TimeDistributed(MaxPooling1D(3))) # After final convolutional layer flatten the output for fully connected recurrent layer model.add(TimeDistributed(Flatten())) #Add dropout to combat overfitting model.add(TimeDistributed(Dropout(0.5))) model.add(LSTM(LSTMnodes)) model.add(Dense(classes)) model.add(BatchNormalization()) model.add(Activation('softmax')) model.compile('Adam', 'categorical_crossentropy') return model segments = int(math.ceil( (nrsamples-segmentsize) / float(stride))) (artistIndex, train, test) = GetTrainingAndTestData() rArtistIndex = dict() for i in artistIndex: rArtistIndex[artistIndex[i]] = i nrtrain = len(train) nrtest = len(test) nrartists = len(artistIndex) correctK = scipy.zeros(nrartists) testIn = scipy.array([GetSegmentedWav(test[i], segmentsize, nrsamples, stride) for i in range(0,nrtest)]) testLabel = scipy.array([ artistIndex[test[i].artist] for i in range(0,nrtest)]) model = CreateModel(segmentsize,segments,nrartists) model.load_weights(weights_file) testOut = model.predict(testIn,batch_size=5) for i in range(0,testOut.shape[0]): p = scipy.argsort(testOut[i])[::-1] for predInd in range(0,p.shape[0]): if(testLabel[i] == p[predInd]): correctK[predInd:] += 1 break print(correctK) scipy.savetxt('rec_correctK', correctK, fmt='%d')